log4j.logger.org.apache.spark.shuffle.sort.SortShuffleManager$=DEBUG
SortShuffleManager — The Default Shuffle System
SortShuffleManager is the one and only shuffle manager in Spark with the short name sort or tungsten-sort.
SortShuffleManager uses IndexShuffleBlockResolver (as shuffleBlockResolver internal registry).
| Name | Description |
|---|---|
Used when ??? |
|
Tip
|
Enable Add the following line to Refer to Logging. |
Creating SortShuffleManager Instance
SortShuffleManager takes a SparkConf.
SortShuffleManager makes sure that spark.shuffle.spill Spark property is enabled. If not you should see the following WARN message in the logs:
WARN SortShuffleManager: spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+. Shuffle will continue to spill to disk when necessary.
SortShuffleManager initializes the internal registries and counters.
|
Note
|
SortShuffleManager is created when SparkEnv is created (per the driver and executors).
|
Getting ShuffleHandle — registerShuffle Method
registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
|
Note
|
registerShuffle is a part of ShuffleManager contract.
|
registerShuffle returns a new ShuffleHandle that is one of the following:
-
BypassMergeSortShuffleHandle(withShuffleDependency[K, V, V]) whenshouldBypassMergeSortcondition holds. -
SerializedShuffleHandle(withShuffleDependency[K, V, V]) whencanUseSerializedShufflecondition holds. -
BaseShuffleHandle
Returning ShuffleWriter For ShuffleHandle — getWriter Method
getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V]
|
Note
|
getWriter is a part of ShuffleManager contract.
|
Internally, getWriter makes sure that a ShuffleHandle is associated with its numMaps in numMapsForShuffle internal registry.
|
Note
|
getWriter expects that the input handle is of type BaseShuffleHandle (despite the signature that says that it can work with any ShuffleHandle).
|
getWriter then returns a new ShuffleWriter for the input ShuffleHandle:
-
UnsafeShuffleWriterforSerializedShuffleHandle. -
BypassMergeSortShuffleWriterforBypassMergeSortShuffleHandle. -
SortShuffleWriterforBaseShuffleHandle.
shouldBypassMergeSort Method
shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean
shouldBypassMergeSort holds (i.e. is positive) when the input ShuffleDependency has mapSideCombine flag enabled and aggregator defined.
shouldBypassMergeSort holds when mapSideCombine flag is disabled but the number of partitions (of the input ShuffleDependency) is smaller than spark.shuffle.sort.bypassMergeThreshold Spark property.
Otherwise, shouldBypassMergeSort is negative (i.e. false).
|
Note
|
shouldBypassMergeSort is exclusively used when registerShuffle selects a ShuffleHandle.
|
Checking If SerializedShuffleHandle Can Be Used for ShuffleHandle — canUseSerializedShuffle Method
canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean
canUseSerializedShuffle condition holds (i.e. is positive) when all of the following hold (checked in that order):
-
The
Serializerof the inputShuffleDependencysupports relocation of serialized objects. -
The
Aggregatorof the inputShuffleDependencyis not defined. -
The number of partitions of the input
ShuffleDependencyis not greater than the supported maximum number of partitions (which is(1 << 24) - 1, i.e.16777215).
You should see the following DEBUG message in the logs when canUseSerializedShuffle holds:
DEBUG Can use serialized shuffle for shuffle [id]
Otherwise, canUseSerializedShuffle does not hold and you should see one of the following DEBUG messages:
DEBUG Can't use serialized shuffle for shuffle [id] because the serializer, [name], does not support object relocation
DEBUG Can't use serialized shuffle for shuffle [id] because an aggregator is defined
DEBUG Can't use serialized shuffle for shuffle [id] because it has more than [number] partitions
|
Note
|
canUseSerializedShuffle is exclusively used when registerShuffle selects a ShuffleHandle.
|
Settings
| Spark Property | Default Value | Description |
|---|---|---|
|
||
|
No longer in use. When
|